#!/usr/bin/python3

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

import base64
import time
import os
import subprocess

import parent
from testlib import *


@skipDistroPackage()
class TestConnection(MachineCase):

    def setUp(self):
        super().setUp()
        self.ws_executable = f"{self.libexecdir}/cockpit-ws"

    def ostree_setup_ws(self):
        '''Overlay cockpit-ws package on OSTree image

        Disable the cockpit/ws container. This is for tests that don't work with the container,
        and to make sure that overlaying cockpit-ws works as well.
        '''
        m = self.machine
        if not m.ostree_image:
            return

        # uninstall cockpit/ws container startup script
        m.execute("rm /etc/systemd/system/cockpit.service")
        # overlay cockpit-ws rpm
        m.execute("rpm-ostree install --cache-only /var/tmp/build-results/cockpit-ws-*.rpm")
        m.spawn("sync && sync && sync && sleep 0.1 && reboot", "reboot")
        m.wait_reboot()

    @skipBrowser("Firefox cannot work with cookies", "firefox")
    def testBasic(self):
        m = self.machine

        # always test with the default ws install (container on OSTree, package everywhere else)
        self.check_basic_with_start_stop(m.start_cockpit, m.stop_cockpit)

        # on OSTree, also check with overlaid cockpit-ws rpm
        if m.ostree_image:
            def ws_start():
                m.execute(r"""set -e;
                    mkdir -p /etc/systemd/system/cockpit.service.d/ &&
                    printf "[Service]\nExecStart=\n%s --no-tls" `grep ExecStart= /lib/systemd/system/cockpit.service` \
                            > /etc/systemd/system/cockpit.service.d/notls.conf
                    systemctl daemon-reload
                    systemctl start cockpit.socket""")

            def ws_stop():
                m.execute("systemctl stop cockpit cockpit.socket")

            self.ostree_setup_ws()
            # HACK: Getting SELinux errors with just rpm-ostree install; there's a plethora of failures, so just allow them all
            m.execute("setenforce 0")
            self.allow_journal_messages('audit.*avc:  denied .*')
            self.check_basic_with_start_stop(ws_start, ws_stop)

    def check_basic_with_start_stop(self, start_cockpit, stop_cockpit):
        m = self.machine
        b = self.browser
        start_cockpit()

        # take cockpit-ws down on the login page
        b.open("/system")
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        stop_cockpit()
        b.click('#login-button')
        b.wait_text_not('#login-fatal-message', "")
        start_cockpit()
        b.reload()
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.expect_load()
        b.enter_page("/system")

        # cookie should not be marked as secure, it's not https
        cookie = b.cookie("cockpit")
        self.assertTrue(cookie["httpOnly"])
        self.assertEqual(cookie["sameSite"], "Strict")
        self.assertFalse(cookie["secure"])

        # take cockpit-ws down on the server page
        stop_cockpit()
        b.switch_to_top()
        b.wait_in_text(".curtains-ct h1", "Disconnected")

        start_cockpit()
        b.click("#machine-reconnect")
        b.expect_load()
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")

        # sever the connection on the login page
        m.execute("iptables -w -I INPUT -p tcp --dport 9090 -j REJECT --reject-with tcp-reset")
        b.click('#login-button')
        with b.wait_timeout(20):
            b.wait_text_not('#login-fatal-message', "")
        m.execute("iptables -w -D INPUT -p tcp --dport 9090 -j REJECT --reject-with tcp-reset")
        b.reload()
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.expect_load()
        b.enter_page("/system")

        # sever the connection on the server page
        m.execute("iptables -w -I INPUT -p tcp --dport 9090 -j REJECT")
        b.switch_to_top()
        with b.wait_timeout(60):
            b.wait_visible(".curtains-ct")

        b.wait_in_text(".curtains-ct h1", "Disconnected")
        b.wait_in_text('.curtains-ct .pf-c-empty-state__body', "Connection has timed out.")
        m.execute("iptables -w -D INPUT -p tcp --dport 9090 -j REJECT")
        b.click("#machine-reconnect")
        b.expect_load()
        b.enter_page("/system")
        b.logout()

        # deleted cookie after logout should not be marked as secure, it's not https
        cookie = b.cookie("cockpit")
        self.assertEqual(cookie["value"], "deleted")
        self.assertTrue(cookie["httpOnly"])
        self.assertFalse(cookie["secure"])

        if not m.ostree_image:  # cannot write to /usr on OSTree, and cockpit-session is in a container
            # damage cockpit-session permissions, expect generic error message
            m.execute(f"chmod g-x {self.libexecdir}/cockpit-session")
            b.open("/system")
            b.wait_in_text('#login-fatal-message', "Internal error in login process")
            m.execute(f"chmod g+x {self.libexecdir}/cockpit-session")

            self.allow_journal_messages(".*cockpit-session: bridge program failed.*")

            # pretend cockpit-bridge is not installed, expect specific error message
            m.execute("mv /usr/bin/cockpit-bridge /usr/bin/cockpit-bridge.disabled")
            b.open("/system")
            b.wait_visible("#login")
            b.set_val("#login-user-input", "admin")
            b.set_val("#login-password-input", "foobar")
            b.click('#login-button')
            b.wait_visible('#login-fatal-message')
            b.wait_text('#login-fatal-message', "The cockpit package is not installed")
            m.execute("mv /usr/bin/cockpit-bridge.disabled /usr/bin/cockpit-bridge")

        self.allow_restart_journal_messages()

        # Lets crash a systemd-crontrolled process and see if we get a proper backtrace in the logs
        # This helps with debugging failures in the tests elsewhere
        m.execute("""mkdir -p /run/systemd/system/systemd-hostnamed.service.d
                     printf '[Service]\nLimitCORE=infinity\n' > /run/systemd/system/systemd-hostnamed.service.d/core.conf
                     systemctl daemon-reload
                     systemctl restart systemd-hostnamed
                     pkill -e -SEGV systemd-hostnam""")
        wait(lambda: m.execute("journalctl -b | grep 'Process.*systemd-hostnam.*of user.*dumped core.'"))

        # Make sure the core dumps exist in the directory, so we can download them
        cores = m.execute("find /var/lib/systemd/coredump -type f")
        self.assertNotEqual(cores, "")

        self.allow_core_dumps = True
        self.allow_journal_messages(".*org.freedesktop.hostname1.*DBus.Error.NoReply.*")

    @skipImage("OSTree doesn't use systemd units", "fedora-coreos")
    def testUnitLifecycle(self):
        m = self.machine

        def expect_active(unit, is_active):
            status = m.execute(f"systemctl is-active {unit} || true").strip()
            self.assertIn(status, ["active", "inactive"])
            if is_active:
                self.assertEqual(status, "active", f"{unit} is not active")
            else:
                self.assertEqual(status, "inactive", f"{unit} is active")

        def expect_actives(ws_socket, instance_sockets, http_instances, https_instances=0):
            expect_active("cockpit.socket", ws_socket)
            # http instances
            for instance in ["http"]:
                expect_active(f"cockpit-wsinstance-{instance}.socket", instance_sockets)
                expect_active(f"cockpit-wsinstance-{instance}.service", instance in http_instances)
            # number of https instances
            expect_active("cockpit-wsinstance-https-factory.socket", instance_sockets)
            for _type in ["service", "socket"]:
                out = m.execute(f"systemctl --no-legend -t {_type} list-units cockpit-wsinstance-https@*")
                count = len(out.strip().splitlines())
                self.assertEqual(count, https_instances, out)

        # at the beginning, no cockpit related units are running
        expect_actives(False, False, [])

        # http only mode

        m.start_cockpit(tls=False)
        expect_actives(True, False, [])
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))

        expect_actives(True, True, ["http"])
        self.assertRaises(subprocess.CalledProcessError, m.execute,
                          "curl --silent https://127.0.0.1:9090")
        # c-tls knows it can't do https, and not activate that instance
        expect_actives(True, True, ["http"])

        m.restart_cockpit()
        expect_actives(True, True, ["http"])

        m.stop_cockpit()
        expect_actives(False, False, [])

        # cleans up also when cockpit-tls crashes or idle-exits, not just by explicit stop request
        m.start_cockpit(tls=False)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))
        m.execute("pkill -e cockpit-tls")
        expect_actives(True, False, [])

        # and recovers from that
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))
        expect_actives(True, True, ["http"])

        # https mode

        m.start_cockpit(tls=True)
        expect_actives(True, False, [], 0)

        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))

        expect_actives(True, True, ["http"], 0)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent -k --head https://127.0.0.1:9090"))
        expect_actives(True, True, ["http"], 1)

        m.restart_cockpit()
        expect_actives(True, True, ["http"], 1)

        m.stop_cockpit()
        expect_actives(False, False, [], 0)

        m.start_cockpit(tls=True)
        expect_actives(True, False, [], 0)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))

        expect_actives(True, True, ["http"], 0)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent -k --head https://127.0.0.1:9090"))
        expect_actives(True, True, ["http"], 1)

        # cleans up also when cockpit-tls crashes or idle-exits, not just by explicit stop request
        m.execute("pkill -e cockpit-tls")
        expect_actives(True, False, [], 0)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))
        expect_actives(True, True, ["http"], 0)
        # next https request after crash doesn't leak an instance
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent -k --head https://127.0.0.1:9090"))
        expect_actives(True, True, ["http"], 1)

        # instance service+socket going away does not confuse cockpit-tls' bookkeeping
        m.execute("systemctl stop cockpit-wsinstance-https@*.service cockpit-wsinstance-https@*.socket")
        expect_actives(True, True, ["http"], 0)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --show-error -k --head https://127.0.0.1:9090"))
        expect_actives(True, True, ["http"], 1)

        # sockets are inaccessible to users, only to cockpit-tls
        for s in ["http.sock", "https-factory.sock"]:
            out = m.execute(f"su -c '! nc -U /run/cockpit/wsinstance/{s} 2>&1 || exit 1' admin")
            self.assertIn("Permission denied", out)

    @skipImage("OSTree doesn't use systemd units", "fedora-coreos")
    def testHttpsInstanceDoS(self):
        m = self.machine
        # prevent generating core dump artifacts
        m.execute("echo core > /proc/sys/kernel/core_pattern")
        m.start_cockpit(tls=True)

        # some netcat versions need an explicit shutdown option, others default to shutting down and don't have -N
        n_opt = "-N" if "-N" in m.execute("nc -h 2>&1") else ""

        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent -k --head https://127.0.0.1:9090"))

        # number of https instances is bounded (DoS prevention)
        # with MaxTasks=200 und 2 threads per ws instance we should have a
        # rough limit of 100 instances, so at some point curl should start failing
        m.execute("su -s /bin/sh -c 'RC=1; for i in `seq 120`; do "
                  "  echo -n $i | nc %s -U /run/cockpit/wsinstance/https-factory.sock;"
                  "  curl --silent --head --max-time 5 --unix /run/cockpit/wsinstance/https@$i.sock http://dummy > /dev/null || RC=0; "
                  "done; exit $RC' cockpit-ws" % n_opt)

        for type_ in ["socket", "service"]:
            active = int(m.execute("systemctl --no-legend list-units -t %s --state=active "
                                   "'cockpit-wsinstance-https@*' | wc -l" % type_).strip())
            self.assertGreater(active, 45)
            self.assertLess(active, 110)
        failed = int(m.execute("systemctl --no-legend list-units --state=failed 'cockpit-wsinstance-https@*' | wc -l").strip())
        self.assertGreater(failed, 0)
        self.assertLess(failed, 75)  # services and sockets

        self.allow_journal_messages(".*cockpit-ws.*dumped core.*")
        self.allow_journal_messages(".*Error creating thread: Resource temporarily unavailable.*")

        # initial instance still works
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --show-error -k --head https://127.0.0.1:9090"))

        # can launch new instances after freeing up some old ones
        m.execute("systemctl stop cockpit-wsinstance-https@30 cockpit-wsinstance-https@31 cockpit-wsinstance-https@32")
        m.execute(f"echo -n new | nc {n_opt} -U /run/cockpit/wsinstance/https-factory.sock")
        out = m.execute("curl --silent --show-error --head --unix /run/cockpit/wsinstance/https@new.sock http://dummy")
        self.assertIn("HTTP/1.1 200 OK", out)

    @skipBrowser("Firefox needs proper cert and CA", "firefox")
    def testTls(self):
        m = self.machine
        b = self.browser

        # Start Cockpit with TLS
        m.start_cockpit(tls=True)

        # A normal TLS connection works
        output = m.execute('openssl s_client -connect 172.27.0.15:9090 2>&1')
        m.message(output)
        self.assertIn("DONE", output)

        # has proper keyUsage and SAN (both with sscg and with self-signed)
        output = m.execute("openssl s_client -showcerts -connect 172.27.0.15:9090 |"
                           "openssl x509 -noout -ext keyUsage,extendedKeyUsage,subjectAltName")
        # keyUsage
        self.assertIn("Digital Signature", output)
        self.assertIn("Key Encipherment", output)
        # extendedKeyUsage
        self.assertIn("TLS Web Server Authentication", output)
        # SAN
        self.assertIn("IP Address:127.0.0.1", output)
        self.assertIn("DNS:localhost", output)

        # SSLv3 should not work
        output = m.execute('openssl s_client -connect 172.27.0.15:9090 -ssl3 2>&1 || true')
        self.assertNotIn("DONE", output)

        # Some operating systems fail SSL3 on the server side
        self.assertRegex(output, "Secure Renegotiation IS NOT supported|"
                         "ssl handshake failure|"
                         "[uU]nknown option.* -ssl3|"
                         "null ssl method passed|"
                         "wrong version number")

        # RC4 should not work
        output = m.execute('! openssl s_client -connect 172.27.0.15:9090 -tls1_2 -cipher RC4 2>&1')
        self.assertNotIn("DONE", output)
        self.assertRegex(
            output, r"no cipher match|no ciphers available|ssl handshake failure|Cipher is \(NONE\)")

        # get along with read-only config directory, as long as certificate exists
        # this does not work on coreos as the user/group IDs are not mapped correctly
        if not m.ostree_image:
            m.stop_cockpit()
            m.execute("cp -a /etc/cockpit /tmp; mount -o bind -r /tmp/cockpit /etc/cockpit")
            m.start_cockpit(tls=True)
            self.assertIn("HTTP/1.1 200 OK", m.execute("curl -k --head https://127.0.0.1:9090"))
            m.execute("umount /etc/cockpit")

        # Install a certificate chain, and make it accessible for cockpit
        m.upload(["verify/files/cert-chain.cert", "verify/files/cert-chain.key"], "/etc/cockpit/ws-certs.d")
        m.execute("cd /etc/cockpit/ws-certs.d; chgrp cockpit-ws cert-chain.key; ! selinuxenabled || chcon --type etc_t cert-chain.cert cert-chain.key")

        def check_cert_chain():
            # This should also reset the file context
            m.restart_cockpit()
            output = m.execute('openssl s_client -connect 172.27.0.15:9090 2>&1')
            self.assertIn("DONE", output)
            self.assertRegex(output, "s:/?CN *= *localhost")
            self.assertRegex(output, "1 s:/?OU *= *Intermediate")

        check_cert_chain()

        # *.crt file also works
        m.execute("mv /etc/cockpit/ws-certs.d/cert-chain.cert /etc/cockpit/ws-certs.d/cert-chain.crt")
        check_cert_chain()

        # backwards compat: merged cert+key file also still works with cockpit-tls (but not any more with cockpit-ws/container)
        if not m.ostree_image:
            m.execute("""cat /etc/cockpit/ws-certs.d/cert-chain.key >> /etc/cockpit/ws-certs.d/cert-chain.crt
                         chmod 640 /etc/cockpit/ws-certs.d/cert-chain.crt
                         chown root:cockpit-ws /etc/cockpit/ws-certs.d/cert-chain.crt
                         rm /etc/cockpit/ws-certs.d/cert-chain.key""")
            check_cert_chain()

        # certmonger generated certificate; asciibetically later than the above
        # not all images have certmonger
        if m.image not in ["debian-stable", "debian-testing", "fedora-coreos", "arch"]:
            hostname = m.execute("hostname --fqdn").strip()
            m.execute(f"getcert request -f /etc/cockpit/ws-certs.d/monger.cert -k /etc/cockpit/ws-certs.d/monger.key -D {hostname} --ca=local --wait")
            # cert generation succeeded, and it is being tracked
            self.assertIn("MONITORING", m.execute("getcert list"))
            self.assertIn("/etc/cockpit/ws-certs.d/monger.cert",
                          m.execute(f"{self.libexecdir}/cockpit-certificate-ensure --check"))
            m.restart_cockpit()
            output = m.execute('openssl s_client -connect 172.27.0.15:9090 2>&1')
            self.assertIn("DONE", output)
            self.assertRegex(output, f"s:/?CN *= {hostname}")
            self.assertRegex(output, "i:/?CN *= Local Signing Authority.*")

        # login handler: correct password
        m.execute("curl -k -c cockpit.jar -s --head --header 'Authorization: Basic {}' https://127.0.0.1:9090/cockpit/login".format(
            base64.b64encode(b"admin:foobar").decode(), ))
        headers = m.execute("curl -k --head -b cockpit.jar -s https://127.0.0.1:9090/")
        self.assertIn(
            "default-src 'self' https://127.0.0.1:9090; connect-src 'self' https://127.0.0.1:9090 wss://127.0.0.1:9090", headers)
        self.assertIn("Access-Control-Allow-Origin: https://127.0.0.1:9090", headers)
        # CORP and Frame-Options are also set for dynamic paths
        self.assertIn("Cross-Origin-Resource-Policy: same-origin", headers)
        self.assertIn("X-Frame-Options: sameorigin", headers)

        self.allow_journal_messages(
            ".*Peer failed to perform TLS handshake",
            ".*Peer sent fatal TLS alert:.*",
            ".*invalid base64 data in Basic header",
            ".*Error performing TLS handshake: No supported cipher suites have been found.",
            ".*Error performing TLS handshake: Could not negotiate a supported cipher suite.")

        # check the Debian smoke test
        m.upload(["../tools/debian/tests/smoke"], "/tmp")
        m.execute("/tmp/smoke")

        b.ignore_ssl_certificate_errors(True)
        self.login_and_go("/system", tls=True)
        cookie = b.cookie("cockpit")
        # cookie should be marked as secure
        self.assertTrue(cookie["httpOnly"])
        self.assertTrue(cookie["secure"])
        self.assertEqual(cookie["sameSite"], "Strict")
        # same after logout
        b.logout()
        cookie = b.cookie("cockpit")
        self.assertEqual(cookie["value"], "deleted")
        self.assertTrue(cookie["httpOnly"])
        self.assertTrue(cookie["secure"])
        self.assertEqual(cookie["sameSite"], "Strict")

        # http on localhost should not redirect to https
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --head http://127.0.0.1:9090"))
        # http on other IP should redirect to https
        output = m.execute("curl --head http://172.27.0.15:9090")
        self.assertIn("HTTP/1.1 301 Moved Permanently", output)
        self.assertIn("Location: https://172.27.0.15:9090/", output)
        # enable AllowUnencrypted, this disables redirect
        m.execute('mkdir -p /etc/cockpit/ && echo "[WebService]\nAllowUnencrypted=true" > /etc/cockpit/cockpit.conf')
        m.restart_cockpit()
        # now it should not redirect
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --head http://127.0.0.1:9090"))
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --head http://172.27.0.15:9090"))

    def testConfigOrigins(self):
        m = self.machine
        m.execute(
            'mkdir -p /etc/cockpit/ && echo "[WebService]\nOrigins = http://other-origin:9090 http://localhost:9090" > /etc/cockpit/cockpit.conf')
        m.start_cockpit()
        output = m.execute('curl -s -f -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Origin: http://other-origin:9090" -H "Host: localhost:9090" -H "Sec-Websocket-Key: 3sc2c9IzwRUc3BlSIYwtSA==" -H "Sec-Websocket-Version: 13" http://localhost:9090/cockpit/socket')
        self.assertIn('"no-session"', output)

        # The socket should also answer at /socket
        output = m.execute('curl -s -f -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Origin: http://other-origin:9090" -H "Host: localhost:9090" -H "Sec-Websocket-Key: 3sc2c9IzwRUc3BlSIYwtSA==" -H "Sec-Websocket-Version: 13" http://localhost:9090/socket')
        self.assertIn('"no-session"', output)

        self.allow_journal_messages('peer did not close io when expected')

    @skipImage("OSTree doesn't have cockpit-ws", "fedora-coreos")
    def test100YearsCert(self):
        m = self.machine

        selfsign = '/etc/cockpit/ws-certs.d/0-self-signed.cert'
        helper = f'{self.libexecdir}/cockpit-certificate-helper'
        ensure = f'{self.libexecdir}/cockpit-certificate-ensure'

        # Ensure things are as we expect them to be
        m.execute(f'grep DAYS=395 {helper}')
        m.execute(f'test ! -e {selfsign}')

        # Generate a 100 years expiry certificate
        self.sed_file('s/DAYS=395/DAYS=36500/', helper)
        m.execute(f'grep DAYS=36500 {helper}')  # double-check
        m.execute(ensure)

        # Verify the expiry date to be in the far-future.  This is a bit
        # annoying due to the date format OpenSSL uses (which Python can't
        # trivially parse) and the question of locales
        expires = m.execute(f'date -d "$(openssl x509 -enddate -noout < {selfsign} | cut -f2 -d=)" +%s')
        self.assertTrue(int(expires) > time.time() + 99 * 365 * 24 * 60 * 60)

        # Put things back: avoid problematic multiple invocations of .sed_file()
        m.execute(f'sed -i s/DAYS=36500/DAYS=395/ {helper}')
        m.execute(f'grep DAYS=395 {helper}')  # double-check

        # Run ensure again and make sure we get a new certificate
        m.execute('touch /tmp/timestamp')
        m.execute(ensure)
        m.execute(f'test {selfsign} -nt /tmp/timestamp')
        m.execute('rm /tmp/timestamp')

        # Check that the expiry is less than 420 days
        # See https://github.com/sgallagher/sscg/pull/28 for why 420
        expires = m.execute(f'date -d "$(openssl x509 -enddate -noout < {selfsign} | cut -f2 -d=)" +%s')
        self.assertTrue(int(expires) < time.time() + 420 * 24 * 60 * 60)

        # Run ensure again and make sure we *don't* get a new certificate
        m.execute('touch /tmp/timestamp')
        m.execute(ensure)
        m.execute(f'test ! {selfsign} -nt /tmp/timestamp')
        m.execute('rm /tmp/timestamp')

    @skipImage("OSTree doesn't use systemd units", "fedora-coreos")
    def testSocket(self):
        m = self.machine

        # non-admin user
        m.execute("useradd user")

        # enable no-password login for 'admin' and 'user'
        m.execute("passwd -d admin")
        m.execute("passwd -d user")

        self.sed_file('$ a\\\nPermitEmptyPasswords yes', '/etc/ssh/sshd_config',
                      'systemctl restart sshd.service')

        def assertInOrNot(string, result, expected):
            if expected:
                self.assertIn(string, result)
            else:
                self.assertNotIn(string, result)

        def checkMotdForUser(string, user, expected):
            result = m.execute(f"ssh -o StrictHostKeyChecking=no -n {user}@localhost")
            assertInOrNot(string, result, expected)

        def checkMotdContent(string, expected=True):
            # Needs https://github.com/linux-pam/linux-pam/pull/292 (or PAM 1.5.0)
            old_pam = (m.image in ['centos-8-stream', 'debian-stable', 'debian-testing', 'ubuntu-2004', 'ubuntu-stable', 'rhel-8-4', 'rhel-8-5', 'rhel-8-6'])

            # check issue (should be exactly the same as motd)
            assertInOrNot(string, m.execute("cat /etc/issue.d/cockpit.issue"), expected)

            # check motd as 'root' (via cat) and 'admin' and 'user' (via ssh)
            assertInOrNot(string, m.execute("cat /etc/motd.d/cockpit"), expected)
            checkMotdForUser(string, expected=expected, user='admin')
            checkMotdForUser(string, expected=old_pam and expected or False, user='user')

        checkMotdContent('systemctl')
        checkMotdContent(':9090/', expected=False)
        m.start_cockpit()

        checkMotdContent(':9090/')
        checkMotdContent('systemctl', expected=False)
        m.execute("systemctl stop cockpit.socket")

        # Change port according to documentation: https://cockpit-project.org/guide/latest/listen.html
        m.execute('! selinuxenabled || semanage port -m -t websm_port_t -p tcp 443')
        m.execute(
            'mkdir -p /etc/systemd/system/cockpit.socket.d/ && printf "[Socket]\nListenStream=\nListenStream=/run/cockpit/sock\nListenStream=443" > /etc/systemd/system/cockpit.socket.d/listen.conf')

        checkMotdContent('systemctl')
        checkMotdContent(':9090/', expected=False)
        checkMotdContent(':443/', expected=False)
        m.start_cockpit(tls=True)

        checkMotdContent('systemctl', expected=False)
        checkMotdContent(':9090/', expected=False)
        checkMotdContent(':443/')

        output = m.execute('curl -k https://localhost 2>&1 || true')
        self.assertIn('Loading...', output)
        output = m.execute('curl -k --unix /run/cockpit/sock https://dummy 2>&1 || true')
        self.assertIn('Loading...', output)

        output = m.execute('curl -k https://localhost:9090 2>&1 || true')
        self.assertIn('Connection refused', output)

        self.allow_journal_messages(".*Peer failed to perform TLS handshake")

    @skipImage("Can't remove/upgrade packages on OSTree", "fedora-coreos")
    def testWsPackage(self):
        m = self.machine

        if m.image.startswith("debian") or m.image.startswith("ubuntu"):
            # clean up debug symbols, they get in the way of upgrading
            m.execute("dpkg --purge cockpit-ws-dbgsym")
        elif m.image.startswith("rhel"):
            # subscription-manager-cockpit depends on cockpit-ws
            m.execute("rpm --erase subscription-manager-cockpit")

        def install():
            if m.image.startswith("debian") or m.image.startswith("ubuntu"):
                m.execute("dpkg --install /var/tmp/build-results/cockpit-ws_*.deb")
            elif m.image == "arch":
                m.execute("pacman -U --noconfirm /var/tmp/build-results/cockpit-*.pkg.tar.zst")
            else:
                m.execute("if rpm -q cockpit-ws; then rpm --verify cockpit-ws; fi")
                m.execute("rpm --upgrade --force /var/tmp/build-results/cockpit-ws-*.rpm")
                m.execute("rpm --verify cockpit-ws")

        def remove():
            if m.image.startswith("debian") or m.image.startswith("ubuntu"):
                m.execute("dpkg --purge cockpit cockpit-ws")
            elif m.image == "arch":
                m.execute("pacman -Rdd --noconfirm cockpit")
            else:
                m.execute("rpm --erase cockpit cockpit-ws")

        # upgrade from distro version (our images have cockpit-ws preinstalled) sets up dynamic motd/issue symlink
        self.assertEqual(m.execute("readlink /etc/motd.d/cockpit").strip(), "/run/cockpit/motd")
        self.assertEqual(m.execute("readlink /etc/issue.d/cockpit.issue").strip(), "/run/cockpit/motd")

        # package upgrade keeps them
        install()
        self.assertEqual(m.execute("readlink /etc/motd.d/cockpit").strip(), "/run/cockpit/motd")
        self.assertEqual(m.execute("readlink /etc/issue.d/cockpit.issue").strip(), "/run/cockpit/motd")

        # HACK: On Arch Linux the symlink is overwritten, bug?
        if m.image != "arch":
            # manual change/removal is respected on upgrade
            m.execute("ln -sf /dev/null /etc/motd.d/cockpit; rm /etc/issue.d/cockpit.issue")
            install()
            self.assertEqual(m.execute("readlink /etc/motd.d/cockpit").strip(), "/dev/null")
            m.execute("test ! -e /etc/issue.d/cockpit.issue")

        # removing the package cleans up the links
        remove()
        m.execute("test ! -e /etc/motd.d/cockpit")
        m.execute("test ! -e /etc/issue.d/cockpit.issue")

        # fresh install (most of our test images have cockpit-ws preinstalled, so the first test above does not cover that)
        install()
        self.assertEqual(m.execute("readlink /etc/motd.d/cockpit").strip(), "/run/cockpit/motd")
        self.assertEqual(m.execute("readlink /etc/issue.d/cockpit.issue").strip(), "/run/cockpit/motd")

    @skipImage("OSTree doesn't have cockpit-ws", "fedora-coreos")
    def testCommandline(self):
        m = self.machine

        # Large requests are processed correctly with plain HTTP through cockpit-tls
        m.start_cockpit(tls=True)
        self.assertIn('id="login"', m.execute('''curl -s -S -H "Authorization: Negotiate $(printf '%0.7000i' 1)" http://localhost:9090/'''))

        # Large requests are processed correctly with TLS through cockpit-tls
        self.assertIn('id="login"', m.execute('''curl -s -S -k -H "Authorization: Negotiate $(printf '%0.7000i' 1)" https://localhost:9090/'''))
        m.stop_cockpit()

        self.restore_dir("/etc/cockpit")
        m.execute("rm -f /etc/cockpit/ws-certs.d/* /etc/cockpit/cockpit.conf")
        m.write("/etc/cockpit/cockpit.conf", "[WebService]\nLoginTitle = A Custom Title\n")

        m.execute(f"{self.libexecdir}/cockpit-certificate-ensure")
        self.assertTrue(m.execute("ls /etc/cockpit/ws-certs.d/*"))

        m.execute(f"{self.ws_executable} --port 9000 --address 127.0.0.1 0<&- &>/dev/null &")

        # The port may not be available immediately, so wait for it
        wait(lambda: 'A Custom Title' in m.execute('curl -s -k https://localhost:9000/'))

        output = m.execute('curl -s -S -k https://172.27.0.15:9000/ 2>&1 || true')
        self.assertIn('Connection refused', output)

        # Large requests are processed correctly with plain HTTP
        self.assertIn('A Custom Title', m.execute('''curl -s -S -H "Authorization: Negotiate $(printf '%0.7000i' 1)" http://localhost:9000/'''))

        # Large requests are processed correctly with TLS
        self.assertIn('A Custom Title', m.execute('''curl -s -S -k -H "Authorization: Negotiate $(printf '%0.7000i' 1)" https://localhost:9000/'''))

    def testHeadRequest(self):
        m = self.machine
        m.start_cockpit()

        # static handler
        headers = m.execute("curl -s --head http://172.27.0.15:9090/cockpit/static/login.html")
        self.assertIn("HTTP/1.1 200 OK\r\n", headers)
        self.assertIn("Content-Type: text/html\r\n", headers)
        self.assertIn("Cross-Origin-Resource-Policy: same-origin\r\n", headers)
        self.assertIn("X-Frame-Options: sameorigin\r\n", headers)
        # login.html is not always accessible as a file (e.g. on CoreOS), so just assert a reasonable content length
        self.assertIn("Content-Length: ", headers)
        length = int(headers.split('Content-Length: ', 1)[1].split()[0])
        self.assertGreater(length, 5000)
        self.assertLess(length, 100000)

        # login handler: wrong password
        headers = m.execute("curl -s --head --header 'Authorization: Basic {}' http://172.27.0.15:9090/cockpit/login".format(
            base64.b64encode(b"admin:hahawrong").decode()))
        self.assertRegex(headers, r"HTTP/1.1 (401 Authentication failed|403 Permission denied)\r\n")
        self.assertNotIn("Set-Cookie:", headers)

        # login handler: correct password
        headers = m.execute("curl -s --head --header 'Authorization: Basic {}' http://172.27.0.15:9090/cockpit/login".format(
            base64.b64encode(b"admin:foobar").decode()))
        self.assertIn("HTTP/1.1 200 OK\r\n", headers)
        self.assertIn("Set-Cookie: cockpit", headers)

        # socket handler; this should refuse HEAD (as it makes little sense on sockets), so 404
        headers = m.execute("curl -s --head http://172.27.0.15:9090/cockpit/socket")
        self.assertIn("HTTP/1.1 404 Not Found\r\n", headers)

        # external channel handler; unauthenticated, thus 404
        headers = m.execute("curl -s --head http://172.27.0.15:9090/cockpit+123/channel/foo")
        self.assertIn("HTTP/1.1 404 Not Found\r\n", headers)

    @skipImage("ssh root login not allowed", "fedora-coreos")
    def testFlowControl(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/playground/speed", user="root")

        # Check the speed playground page
        b.switch_to_top()
        b.go("/playground/speed")
        b.enter_page("/playground/speed")

        b.wait_text_not("#pid", "")
        pid = b.text("#pid")

        b.set_val("#read-path", "/dev/vda")
        b.click("#read-sideband")

        b.wait_text_not("#speed", "")
        time.sleep(20)
        output = m.execute(f"cat /proc/{pid}/statm")
        rss = int(output.split(" ")[0])

        # This fails when flow control is not present
        self.assertLess(rss, 250000)

    @skipImage("OSTree doesn't have cockpit-ws", "fedora-coreos")
    def testLocalSession(self):
        m = self.machine

        # start ws with --local-session, let it spawn bridge; ensure that this works without /etc/cockpit/
        m.spawn("su - -c 'G_MESSAGES_DEBUG=all XDG_CONFIG_DIRS=/usr/local %s -p 9999 -a 127.0.0.90 "
                "--local-session=cockpit-bridge' admin" % self.ws_executable,
                "cockpit-ws-local")
        m.wait_for_cockpit_running('127.0.0.90', 9999)
        # System frame should work directly, no login page
        out = m.execute("curl --compressed http://127.0.0.90:9999/cockpit/@localhost/system/index.html")
        self.assertIn('id="overview"', out)

        # shut it down, wait until it is gone
        m.execute("pkill -ef cockpit-ws")

        # start ws with --local-session and existing running bridge
        script = '''#!/bin/bash -eu
coproc env G_MESSAGES_DEBUG=all cockpit-bridge
G_MESSAGES_DEBUG=all XDG_CONFIG_DIRS=/usr/local %s -p 9999 -a 127.0.0.90 --local-session=- <&${COPROC[0]} >&${COPROC[1]}
''' % self.ws_executable
        m.execute(["tee", "/tmp/local.sh"], input=script)
        m.execute("chmod a+x /tmp/local.sh")
        m.spawn("su - -c /tmp/local.sh admin", "local.sh")
        m.wait_for_cockpit_running('127.0.0.90', 9999)

        # System frame should work directly, no login page
        out = m.execute("curl --compressed http://127.0.0.90:9999/cockpit/@localhost/system/index.html")
        self.assertIn('id="overview"', out)

        self.allow_journal_messages("couldn't register polkit authentication agent.*")

    @skipImage("OSTree doesn't have cockpit-ws", "fedora-coreos")
    @skipImage("Kernel does not allow user namespaces", "debian-stable", "debian-testing")
    def testCockpitDesktop(self):
        m = self.machine

        cases = [(['/cockpit/@localhost/system/index.html', 'system', 'system/index', 'system/'],
                  ['id="overview"']
                  ),
                 (['/cockpit/@localhost/network/firewall.html', 'network/firewall'],
                  ['div id="firewall"', 'script src="firewall.js"']
                  ),
                 (['/cockpit/@localhost/playground/react-patterns.html', 'playground/react-patterns'],
                  ['script src="react-patterns.js"']
                  ),
                 # no ssh host
                 (['/cockpit/@localhost/manifests.json'],
                  ['"system"', '"Overview"']
                  ),
                 # remote ssh host
                 (['/cockpit/@localhost/manifests.json test1@localhost'],
                  ['"system"', '"Overview"', '"HACK"']
                  )
                 ]

        # prepare fake ssh target; to verify that we really use that, fake dashboard manifest
        m.execute("""set -e; useradd test1
                  [ -f ~admin/.ssh/id_rsa ] || su -c "ssh-keygen -t rsa -N '' -f ~/.ssh/id_rsa" admin
                  mkdir -p ~test1/.ssh ~test1/.local/share/cockpit/dashboard
                  echo '{ "version": "42", "dashboard": { "index": { "label": "HACK" } } }' > ~test1/.local/share/cockpit/dashboard/manifest.json
                  cp ~admin/.ssh/id_rsa.pub ~test1/.ssh/authorized_keys
                  ssh-keyscan localhost >> ~admin/.ssh/known_hosts
                  chown admin:admin ~admin/.ssh/known_hosts
                  chown -R test1:test1 ~test1
                  su -c "ssh test1@localhost cockpit-bridge --packages" admin | grep -q test1.*dashboard  # validate setup
                  """)

        for (pages, asserts) in cases:
            for page in pages:
                m.execute(f'''su - -c 'BROWSER="curl --silent --compressed -o /tmp/out.html" {self.libexecdir}/cockpit-desktop {page}' admin''')

                out = m.execute("cat /tmp/out.html")
                for a in asserts:
                    self.assertIn(a, out)

                # should clean up processes
                self.assertEqual(m.execute("! pgrep -a cockpit-ws && ! pgrep -a cockpit-bridge"), "")

        # cockpit-desktop can start a privileged bridge through polkit
        # we don't have an agent, so just allow the privilege without interactive authentication
        polkit_loc = "/etc/polkit-1/rules.d" if m.image == "arch" else "/etc/polkit-1/localauthority/50-local.d"
        m.write(f"{polkit_loc}/test.pkla", r"""
[Testing without an agent]
Identity=unix-user:admin
Action=org.cockpit-project.cockpit.root-bridge
ResultAny=yes
ResultInactive=yes
ResultActive=yes""")
        m.write(r"/tmp/browser.sh", """#!/bin/sh -e
curl --silent --compressed -o /tmp/out.html "$@"
# wait until privileged bridge starts
until pgrep -f cockpit-bridge.*--privileged; do sleep 1; done
""")
        m.execute("chmod 755 /tmp/browser.sh")
        m.execute(f"su - -c 'BROWSER=/tmp/browser.sh {self.libexecdir}/cockpit-desktop system' admin")
        self.assertIn('id="overview"', m.execute("cat /tmp/out.html"))

        self.allow_journal_messages("couldn't register polkit authentication agent.*")
        self.allow_journal_messages("Refusing to render service to dead parents.")
        self.allow_journal_messages(".*No authentication agent found.*")
        self.allow_journal_messages(".*Peer failed to perform TLS handshake.*")
        self.allow_journal_messages(r".*cannot reauthorize identity\(s\): unix-user:.*")

    @skipBrowser("Firefox needs proper cert and CA", "firefox")
    def testReverseProxy(self):
        m = self.machine
        b = self.browser

        self.ostree_setup_ws()

        # set up a poor man's reverse TLS proxy with socat
        m.upload(["../src/bridge/mock-server.crt", "../src/bridge/mock-server.key"], "/tmp")
        m.spawn("socat OPENSSL-LISTEN:9090,reuseaddr,fork,cert=/tmp/mock-server.crt,"
                "key=/tmp/mock-server.key,verify=0 TCP:localhost:9099",
                "socat-tls.log")

        # and another proxy for plain http
        m.spawn("socat TCP-LISTEN:9091,reuseaddr,fork TCP:localhost:9099", "socat.log")

        # ws with plain --no-tls should fail after login with mismatching Origin (expected http, got https)
        m.spawn(f"su -s /bin/sh -c '{self.ws_executable} --no-tls -p 9099' cockpit-wsinstance",
                "ws-notls.log")
        m.wait_for_cockpit_running(tls=True)

        b.ignore_ssl_certificate_errors(True)
        b.open(f"https://{b.address}:{b.port}/system")
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.expect_load()

        def check_wss_log():
            for log in self.browser.get_js_log():
                if 'Error during WebSocket handshake: Unexpected response code: 403' in log:
                    return True
            return False
        wait(check_wss_log)

        wait(lambda: m.execute("grep 'received request from bad Origin' /var/log/ws-notls.log"))

        # sanity check: unencrypted http through SSL proxy does not work
        m.execute("! curl http://localhost:9090")

        # does not redirect to https (through plain http proxy)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9091"))
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://172.27.0.15:9091"))

        m.execute("pkill -e cockpit-ws; while pgrep -a cockpit-ws; do sleep 1; done")
        # this page failure is reeally noisy
        self.allow_restart_journal_messages()
        self.allow_journal_messages(".*No authentication agent found.*")
        self.allow_journal_messages("couldn't register polkit authentication agent.*")
        self.allow_journal_messages("received request from bad Origin.*")
        self.allow_journal_messages(".*invalid handshake.*")
        self.allow_browser_errors(".*received unsupported version in init message.*")
        self.allow_browser_errors(".*received message before init.*")
        self.allow_browser_errors("Error reading machine id")

        # ws with --for-tls-proxy accepts only https origins, thus should work
        m.spawn(f"su -s /bin/sh -c '{self.ws_executable} --for-tls-proxy -p 9099 -a 127.0.0.1' cockpit-wsinstance",
                "ws-fortlsproxy.log")
        m.wait_for_cockpit_running(tls=True)
        b.open(f"https://{b.address}:{b.port}/system")
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.expect_load()
        b.wait_visible('#content')
        b.enter_page("/system")
        # cookie should be marked as secure, as for the browser it's https
        cookie = b.cookie("cockpit")
        self.assertTrue(cookie["httpOnly"])
        self.assertTrue(cookie["secure"])
        b.logout()
        # deleted cookie after logout should be marked as secure
        cookie = b.cookie("cockpit")
        self.assertEqual(cookie["value"], "deleted")
        self.assertTrue(cookie["httpOnly"])
        self.assertTrue(cookie["secure"])

        # should have https:// URLs in Content-Security-Policy
        out = m.execute("curl --insecure --head https://localhost:9090/")
        self.assertIn("Content-Security-Policy: connect-src 'self' https://localhost:9090 wss://localhost:9090;", out)

        # sanity check: does not redirect to https (through plain http proxy) -- this isn't a supported mode, though!
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9091"))
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://172.27.0.15:9091"))

    def testCaCert(self):
        m = self.machine

        m.start_cockpit()
        if not m.ostree_image:
            # Really start Cockpit to make sure it has generated all its certificates.
            m.execute("systemctl start cockpit")

        # Start without a CA certificate.
        m.execute("rm -f /etc/cockpit/ws-certs.d/0-self-signed-ca.pem")
        m.execute("! curl -sfS http://localhost:9090/ca.cer")

        # Now make one up and check that is is served.
        m.write("/etc/cockpit/ws-certs.d/0-self-signed-ca.pem", "FAKE CERT FOR TESTING\n")
        self.assertEqual(m.execute("curl -sfS http://localhost:9090/ca.cer"), "FAKE CERT FOR TESTING\n")


@skipDistroPackage()
class TestReverseProxy(MachineCase):

    provision = {
        "0": {"forward": {"443": 8443}}
    }

    def setUp(self):
        super().setUp()
        m = self.machine

        m.execute("if firewall-cmd --state >/dev/null 2>&1; then firewall-cmd --add-service https; fi")

        m.upload(["../src/tls/ca/alice.pem", "../src/tls/ca/alice.key"], "/etc/pki")

        m.write("/etc/cockpit/cockpit.conf", """[WebService]
Origins = https://%(origin)s wss://%(origin)s
ProtocolHeader = X-Forwarded-Proto
""" % {"origin": m.forward["443"]}, append=True)

        m.execute("setsebool -P httpd_can_network_connect on")
        self.allow_journal_messages("audit.*bool=httpd_can_network_connect.*val=1.*")

    def checkCockpitOnProxy(self, urlroot=""):
        b = self.browser

        # should use nginx' certificate, not cockpit's; use --resolve so that SNI matches the certificate's CN
        (https_host, https_port) = self.machine.forward["443"].split(':')
        out = subprocess.check_output(
            ["curl", "--verbose", "--head",
             "--resolve", f"alice:{https_port}:{https_host}",
             "--cacert", os.path.join(TEST_DIR, "../src/tls/ca/ca.pem"),
             f"https://alice:{https_port}{urlroot}/cockpit/static/login.html"],
            stderr=subprocess.STDOUT)
        self.assertIn(b"HTTP/1.1 200 OK", out)
        self.assertIn(b"subject: CN=alice; DC=COCKPIT", out)

        # works with browser (but we can't set our CA)
        b.ignore_ssl_certificate_errors(True)
        b.open(f"https://{https_host}:{https_port}{urlroot}/system")
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.expect_load()
        b.wait_visible('#content')
        b.logout()

        self.allow_restart_journal_messages()

    @skipImage("nginx not installed", "centos-8-stream", "debian-stable", "debian-testing", "fedora-coreos",
               "rhel-8-4", "rhel-8-5", "rhel-8-6", "rhel-9-0", "ubuntu-stable", "ubuntu-2004", "arch")
    @skipBrowser("Firefox needs proper cert and CA", "firefox")
    def testNginxTLS(self):
        '''test proxying to Cockpit with TLS

        As described on https://github.com/cockpit-project/cockpit/wiki/Proxying-Cockpit-over-NGINX
        This use use case is important for proxying a remote machine.
        '''
        m = self.machine

        m.write("/etc/nginx/conf.d/cockpit.conf", """
server {
    listen 443 ssl;
    server_name %(origin)s;
    root /srv/www;

    ssl_certificate "/etc/pki/alice.pem";
    ssl_certificate_key "/etc/pki/alice.key";

    location / {
        # Required to proxy the connection to Cockpit
        proxy_pass https://127.0.0.1:9090;
        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-Proto $scheme;

        # Required for web sockets to function
        proxy_http_version 1.1;
        proxy_buffering off;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";

        # Pass ETag header from Cockpit to clients.
        # See: https://github.com/cockpit-project/cockpit/issues/5239
        gzip off;
    }
}
""" % {"origin": m.forward["443"]})

        m.execute("systemctl start nginx")
        m.start_cockpit(tls=True)
        self.checkCockpitOnProxy()

        # now test with UrlRoot
        m.write("/etc/cockpit/cockpit.conf", "UrlRoot = cockpit-root\n", append=True)
        m.execute("systemctl stop cockpit.service")
        self.sed_file("s_location /_location /cockpit-root_", "/etc/nginx/conf.d/cockpit.conf",
                      "systemctl restart nginx")
        self.checkCockpitOnProxy(urlroot="/cockpit-root")

        # get a non-cockpit file from the server
        m.execute("mkdir -p /srv/www/embed-cockpit")
        m.upload(["verify/files/embed-cockpit/index.html",
                  "verify/files/embed-cockpit/embed.js",
                  "verify/files/embed-cockpit/embed.css"],
                 "/srv/www/embed-cockpit/")
        m.execute("if selinuxenabled 2>&1; then chcon -R -t httpd_sys_content_t /srv/www; fi")

        (https_host, https_port) = self.machine.forward["443"].split(':')
        out = subprocess.check_output(
            ["curl", "--verbose",
             "--resolve", f"alice:{https_port}:{https_host}",
             "--cacert", os.path.join(TEST_DIR, "../src/tls/ca/ca.pem"),
             f"https://alice:{https_port}/embed-cockpit/embed.css"],
            stderr=subprocess.STDOUT)
        self.assertIn(b"HTTP/1.1 200 OK", out)
        self.assertIn(b"#embed-links", out)

        # embedding
        b = self.browser
        b.ignore_ssl_certificate_errors(True)
        b.open(f"https://{https_host}:{https_port}/embed-cockpit/index.html")
        b.set_val("#embed-address", f"https://{https_host}:{https_port}/cockpit-root")
        b.click("#embed-full")
        b.wait_visible("iframe[name='embed-full'][loaded]")
        b.switch_to_frame("embed-full")

        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.expect_load_frame("embed-full")
        b.wait_visible('article.system-health')

    @skipImage("nginx not installed", "centos-8-stream", "debian-stable", "debian-testing", "fedora-coreos",
               "rhel-8-4", "rhel-8-5", "rhel-8-6", "rhel-9-0", "ubuntu-stable", "ubuntu-2004", "arch")
    @skipBrowser("Firefox needs proper cert and CA", "firefox")
    def testNginxNoTLS(self):
        '''test proxying to Cockpit with plain HTTP

        This can be done when nginx and cockpit run on the same machine.
        '''
        m = self.machine

        m.write("/etc/nginx/conf.d/cockpit.conf", """
server {
    listen 443 ssl;
    server_name %(origin)s;

    ssl_certificate "/etc/pki/alice.pem";
    ssl_certificate_key "/etc/pki/alice.key";

    location / {
        # Required to proxy the connection to Cockpit
        proxy_pass http://127.0.0.1:9090;
        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-Proto $scheme;

        # Required for web sockets to function
        proxy_http_version 1.1;
        proxy_buffering off;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";

        # Pass ETag header from Cockpit to clients.
        # See: https://github.com/cockpit-project/cockpit/issues/5239
        gzip off;
    }
}
""" % {"origin": m.forward["443"]})

        m.execute("systemctl start nginx")

        # start cockpit-ws in proxy mode, skip all the ws-certs.d/ steps
        m.spawn(f"su -s /bin/sh -c '{self.libexecdir}/cockpit-ws --address=127.0.0.1 --for-tls-proxy' cockpit-wsinstance", "ws.log")
        m.wait_for_cockpit_running()

        self.checkCockpitOnProxy()

        self.allow_journal_messages("couldn't register polkit authentication agent.*")


if __name__ == '__main__':
    test_main()
